[plan] Extract ResourceCache and PythonResourceBridge from AgentPlan#548
[plan] Extract ResourceCache and PythonResourceBridge from AgentPlan#548weiqingy wants to merge 3 commits intoapache:mainfrom
Conversation
|
Both CI failures are unrelated to our changes:
|
| public class ResourceCache implements AutoCloseable { | ||
|
|
||
| private final Map<ResourceType, Map<String, ResourceProvider>> resourceProviders; | ||
| private final Map<ResourceType, Map<String, Resource>> cache = new HashMap<>(); |
There was a problem hiding this comment.
Should we use ConcurrentHashMap here? For task submit by ctx.durableExecuteAsync may read/write this hashmap parallel.
There was a problem hiding this comment.
Good question! This is intentional. After the refactoring, ResourceCache is created and owned by ActionExecutionOperator.open(), so it's scoped to a single operator subtask. In Flink's execution model, all access (processElement, open, close) runs on the same mailbox thread. durableExecuteAsync dispatches work to external threads, but resource resolution from the cache happens on the operator thread before dispatch. So HashMap is sufficient and avoids the overhead of ConcurrentHashMap. Let me know if this matches your understanding.
There was a problem hiding this comment.
but resource resolution from the cache happens on the operator thread before dispatch
I think the resource resolution may not always happens before dispatch.
Take the built in chat action as an example. In chat action, we submit chat task to external threads, and the chat task will call chat method of BaseChatModelSetup, which occurs in an asynchronous thread. In the chat method of chat model setup, it will resolves the correspond connection, prompt and tools, which I think may lead to concurrent access to the cache.
There was a problem hiding this comment.
You're right, thanks for pushing back on this. I traced the code path more carefully:
ChatModelAction → durableExecuteAsync(callable) → async pool thread runs chatModel.chat() → BaseChatModelSetup.chat() calls this.getResource.apply() to resolve connection, prompt, and tools → which hits ResourceCache.getResource().
So resource resolution does happen on async threads, not just the mailbox thread. I'll switch back to ConcurrentHashMap. Good catch!
|
Thanks for the review, @wenjin272! I’ve updated the PR to apply the same separation on the Python side as well - could you please take another look? |
wenjin272
left a comment
There was a problem hiding this comment.
LGTM. Could you take a look at your convenience @xintongsong ?
|
I checked the CI failures - both are LLM-dependent e2e tests and don’t appear to be caused by this PR. Test 1 (react_agent_test): The output 4444 = 2123 + 2321 proves our ResourceCache IS working correctly — the chat model was resolved, the add tool was resolved and called successfully. The LLM (qwen3:1.7b) simply stopped after one tool call instead of continuing to call multiply(4444, 312). This is LLM non-determinism. Test 2 (long_term_memory_test): This runs on the Flink remote runner, where there's exactly ONE FlinkRunnerContext with ONE ResourceCache. The behavior is identical to before. The failure is assert len(doc) == 1 after LLM-based compaction using qwen3:8b — if the model's summarization response is malformed, compaction produces incorrect output. We can re-run CI to confirm flakiness — if it fails again with different assertion values, that would further support LLM non-determinism. @wenjin272 do you have access to re-run the CI tests? It looks like admin rights are required. |
|
Hi, @weiqingy, sorry for I don't have access to re-run the CI. I acknowledge that the failing test is due to its own flakiness and not caused by this PR. |
Linked issue: #547
Purpose of change
AgentPlan(624 lines) mixes plan definition, resource caching/resolution, Python bridge wiring, and serialization.This PR extracts two classes to separate concerns:
ResourceCache— lazy resource resolution, caching, and cleanup. Created by the operator inopen(), owned bythe operator lifecycle.
PythonResourceBridge— staticdiscoverPythonMCPResources()for Python MCP tool/prompt discovery. Calledduring operator init.
After extraction,
AgentPlanbecomes immutable after construction (~490 lines, down from 624). The removed publicmethods are
getResource(),close(), andsetPythonResourceAdapter().Tests
mvn test -pl plan— all plan module tests passmvn test -pl runtime— all runtime module tests pass./tools/lint.sh -c— formatting check passed./tools/ut.sh -j— full Java test suite passedAPI
Yes. Three public methods removed from
AgentPlan:getResource(String, ResourceType)— replaced byResourceCache.getResource()close()— replaced byResourceCache.close()setPythonResourceAdapter(PythonResourceAdapter)— replaced byPythonResourceBridge.discoverPythonMCPResources()Documentation
doc-neededdoc-not-neededdoc-included